from unittest.mock import Mock, patch

import torch

from tests.ut.base import TestBase
from vllm_ascend.quantization.w4a8_dynamic import (
    AscendW4A8DynamicFusedMoEMethod, AscendW4A8DynamicLinearMethod)


class TestAscendW4A8DynamicLinearMethod(TestBase):

    @patch('vllm.distributed.get_tensor_model_parallel_world_size')
    @patch('vllm_ascend.quantization.w4a8_dynamic.get_current_vllm_config')
    def setUp(self, mock_get_current_vllm_config, mock_get_tp_world_size):
        mock_get_tp_world_size.return_value = 1
        mock_vllm_config = Mock()
        mock_vllm_config.quant_config = Mock(
            quant_description={"group_size": 256})
        mock_vllm_config.scheduler_config = Mock(max_num_batched_tokens=2048,
                                                 max_model_len=2048,
                                                 enable_chunked_prefill=False)
        mock_get_current_vllm_config.return_value = mock_vllm_config
        self.method = AscendW4A8DynamicLinearMethod()
        self.method.group_size = 8

    def test_get_weight(self):
        weight = self.method.get_weight(8, 32, torch.bfloat16)
        self.assertEqual(weight["weight"].dtype, torch.int8)
        self.assertEqual(weight["weight"].shape, (32, 8))
        # new quant version weight
        self.method.new_quant_version = True
        weight = self.method.get_weight(8, 32, torch.bfloat16)
        self.assertEqual(weight["weight"].dtype, torch.int8)
        self.assertEqual(weight["weight"].shape, (16, 8))
        self.assertEqual(weight["_packed_dim"], 0)
        self.assertEqual(weight["_packed_factor"], 2)

    def test_get_pergroup_param(self):
        params = self.method.get_pergroup_param(8, 32, torch.bfloat16)
        self.assertEqual(params["weight_scale"].dtype, torch.bfloat16)
        self.assertEqual(params["weight_scale"].shape, (32, 1))
        self.assertEqual(params["weight_offset"].dtype, torch.bfloat16)
        self.assertEqual(params["weight_offset"].shape, (32, 1))
        self.assertEqual(params["weight_scale_second"].dtype, torch.bfloat16)
        self.assertEqual(params["weight_scale_second"].shape, (32, 1))
        self.assertEqual(params["weight_offset_second"].dtype, torch.bfloat16)
        self.assertEqual(params["weight_offset_second"].shape, (32, 1))
        # new quant version weight
        self.method.new_quant_version = True
        params = self.method.get_pergroup_param(8,
                                                32,
                                                torch.bfloat16,
                                                layer_type="column")
        self.assertEqual(params["scale_bias"].dtype, torch.float32)
        self.assertEqual(params["scale_bias"].shape, (32, 1))
        params = self.method.get_pergroup_param(8,
                                                32,
                                                torch.bfloat16,
                                                layer_type="row")
        self.assertEqual(params["scale_bias"].dtype, torch.float32)
        self.assertEqual(params["scale_bias"].shape, (32, 16))

    @patch('torch_npu.npu_convert_weight_to_int4pack')
    @patch('torch.Tensor.npu')
    def test_process_weights_after_loading(self, mock_npu,
                                           mock_npu_convert_weight):
        mock_npu.side_effect = lambda: torch.zeros(
            (1, 32), dtype=torch.float32)
        mock_npu_convert_weight.return_value = torch.zeros((32, 4),
                                                           dtype=torch.int32)
        # old quant version weight
        layer = torch.nn.Module()
        layer.weight = torch.nn.Parameter(torch.zeros((32, 8),
                                                      dtype=torch.int8),
                                          requires_grad=False)
        layer.weight_scale = torch.nn.Parameter(torch.ones(
            (32, 1), dtype=torch.float32),
                                                requires_grad=False)
        layer.weight_offset = torch.nn.Parameter(torch.empty_like(
            layer.weight_scale.data),
                                                 requires_grad=False)
        layer.weight_scale_second = torch.nn.Parameter(torch.ones(
            (32, 1), dtype=torch.float32),
                                                       requires_grad=False)
        layer.weight_offset_second = torch.nn.Parameter(torch.empty_like(
            layer.weight_scale_second.data),
                                                        requires_grad=False)
        self.method.process_weights_after_loading(layer)
        self.assertTrue(hasattr(layer, "weight_scale_bias"))
        self.assertEqual(layer.weight_scale_bias.data.shape, (32, ))
        self.assertEqual(layer.weight_scale_bias.data.dtype, torch.float32)
        # new quant version weight
        self.method.new_quant_version = True
        new_layer = torch.nn.Module()
        new_layer.weight = torch.nn.Parameter(torch.zeros((16, 8),
                                                          dtype=torch.int8),
                                              requires_grad=False)
        new_layer.weight_scale = torch.nn.Parameter(torch.ones(
            (32, 1), dtype=torch.float32),
                                                    requires_grad=False)
        new_layer.weight_offset = torch.nn.Parameter(torch.empty_like(
            new_layer.weight_scale.data),
                                                     requires_grad=False)
        new_layer.weight_scale_second = torch.nn.Parameter(torch.ones(
            (32, 1), dtype=torch.float32),
                                                           requires_grad=False)
        new_layer.weight_offset_second = torch.nn.Parameter(
            torch.empty_like(new_layer.weight_scale_second.data),
            requires_grad=False)
        new_layer.scale_bias = torch.nn.Parameter(torch.zeros(
            (32, 1), dtype=torch.float32),
                                                  requires_grad=False)
        self.method.process_weights_after_loading(new_layer)
        self.assertEqual(new_layer.scale_bias.data.shape, (32, ))
        self.assertTrue(hasattr(new_layer, "weight_scale_second"))
        self.assertEqual(new_layer.weight_scale_second.data.shape, (1, 32))


class TestAscendW4A8DynamicFusedMoEMethod(TestBase):
    experts = 8
    input_size = 16
    output_size = 56
    group_size = 2

    @patch('vllm_ascend.quantization.w4a8_dynamic.get_ascend_config')
    @patch('vllm_ascend.quantization.w4a8_dynamic.get_current_vllm_config')
    @patch('vllm_ascend.quantization.w4a8_dynamic.get_ep_group')
    @patch('vllm_ascend.quantization.w4a8_dynamic.get_mc2_group')
    @patch('torch.distributed.get_rank', return_value=0)
    def setUp(self, mock_get_rank, mock_get_mc2_group, mock_get_ep_group,
              get_current_vllm_config, mock_get_ascend_config):
        # Mock ascend config
        mock_ascend_config = Mock()
        mock_ascend_config.dynamic_eplb = False
        mock_get_ascend_config.return_value = mock_ascend_config

        mock_vllm_config = Mock()
        mock_vllm_config.quant_config = Mock(quant_description={
            "group_size": self.group_size,
            "version": "0.0.0"
        })
        mock_vllm_config.parallel_config = Mock(enable_expert_parallel=True)
        mock_vllm_config.scheduler_config = Mock(max_num_batched_tokens=2048,
                                                 max_model_len=2048,
                                                 enable_chunked_prefill=False)
        get_current_vllm_config.return_value = mock_vllm_config
        self.quant_method = AscendW4A8DynamicFusedMoEMethod()

    def test_get_weight(self):
        # old quant version w4a8 weight
        param_dict = self.quant_method.get_weight(self.experts,
                                                  self.input_size,
                                                  self.output_size,
                                                  torch.bfloat16)
        self.assertEqual(param_dict["w13_weight"].dtype, torch.int8)
        self.assertEqual(param_dict["w13_weight"].shape,
                         (self.experts, 2 * self.input_size, self.output_size))
        # new quant version weight
        self.quant_method.new_quant_version = True
        param_dict = self.quant_method.get_weight(self.experts,
                                                  self.input_size,
                                                  self.output_size,
                                                  torch.bfloat16)
        self.assertEqual(param_dict["w13_weight"].dtype, torch.int8)
        self.assertEqual(param_dict["w13_weight"].shape,
                         (self.experts, self.input_size, self.output_size))

    def test_get_dynamic_quant_param(self):
        # old quant version weight
        param_dict = self.quant_method.get_dynamic_quant_param(
            self.experts, self.input_size, self.output_size, torch.bfloat16)
        self.assertEqual(param_dict["w13_weight_scale"].dtype, torch.float32)
        self.assertEqual(param_dict["w13_weight_scale"].shape,
                         (self.experts, 2 * self.input_size, 1))
        self.assertEqual(param_dict["w13_weight_scale_second"].dtype,
                         torch.float32)
        self.assertEqual(param_dict["w13_weight_scale_second"].shape,
                         (self.experts, 2 * self.input_size,
                          self.output_size // self.group_size))
        self.assertEqual(param_dict["w2_weight_scale"].dtype, torch.float32)
        self.assertEqual(param_dict["w2_weight_scale"].shape,
                         (self.experts, self.output_size, 1))
        self.assertEqual(param_dict["w2_weight_scale_second"].dtype,
                         torch.float32)
        self.assertEqual(param_dict["w2_weight_scale_second"].shape,
                         (self.experts, self.output_size,
                          self.input_size // self.group_size))
        # new quant version weight
        self.quant_method.new_quant_version = True
        param_dict = self.quant_method.get_dynamic_quant_param(
            self.experts, self.input_size, self.output_size, torch.bfloat16)
        self.assertEqual(param_dict["w2_scale_bias"].dtype, torch.float32)
        self.assertEqual(
            param_dict["w2_scale_bias"].shape,
            (self.experts, self.output_size, 16 // self.quant_method.tp_size))
        # per-channel weight
        self.quant_method.is_per_channel_weight = True
        param_dict = self.quant_method.get_dynamic_quant_param(
            self.experts, self.input_size, self.output_size, torch.bfloat16)
        pergroup_param = [
            "w13_weight_scale_second", "w13_weight_offset_second",
            "w2_weight_scale_second", "w2_weight_offset_second"
        ]
        is_contains = any(key in param_dict for key in pergroup_param)
        self.assertFalse(is_contains)

    def build_layer(self,
                    is_new_quant_version=True,
                    is_per_channel_weight=False):
        layer = torch.nn.Module()
        if is_new_quant_version:
            layer.w13_weight = torch.nn.Parameter(torch.zeros(
                (self.experts, self.input_size, self.output_size),
                dtype=torch.int8),
                                                  requires_grad=False)
            layer.w2_weight = torch.nn.Parameter(torch.zeros(
                (self.experts, self.output_size // 2, self.input_size),
                dtype=torch.int8),
                                                 requires_grad=False)
            w13_scale_bias = torch.zeros(
                (self.experts, 2 * self.input_size, 1), dtype=torch.float32)
            layer.w13_scale_bias = torch.nn.Parameter(w13_scale_bias,
                                                      requires_grad=False)
            w2_scale_bias = torch.zeros((self.experts, self.output_size,
                                         16 // self.quant_method.tp_size),
                                        dtype=torch.float32)
            layer.w2_scale_bias = torch.nn.Parameter(w2_scale_bias,
                                                     requires_grad=False)
        else:
            layer.w13_weight = torch.nn.Parameter(torch.zeros(
                (self.experts, 2 * self.input_size, self.output_size),
                dtype=torch.int8),
                                                  requires_grad=False)
            layer.w2_weight = torch.nn.Parameter(torch.zeros(
                (self.experts, self.output_size, self.input_size),
                dtype=torch.int8),
                                                 requires_grad=False)
        layer.w13_weight_scale = torch.nn.Parameter(torch.ones(
            (self.experts, 2 * self.input_size, 1), dtype=torch.float32),
                                                    requires_grad=False)
        layer.w2_weight_scale = torch.nn.Parameter(torch.ones(
            (self.experts, self.output_size, 1), dtype=torch.float32),
                                                   requires_grad=False)
        if not is_per_channel_weight:
            layer.w13_weight_scale_second = torch.nn.Parameter(
                torch.ones((self.experts, 2 * self.input_size,
                            self.output_size // self.group_size),
                           dtype=torch.float32),
                requires_grad=False)
            layer.w13_weight_offset_second = torch.nn.Parameter(
                torch.empty_like(layer.w13_weight_scale_second.data),
                requires_grad=False)
            layer.w2_weight_scale_second = torch.nn.Parameter(
                torch.ones((self.experts, self.output_size,
                            self.input_size // self.group_size),
                           dtype=torch.float32),
                requires_grad=False)
            layer.w2_weight_offset_second = torch.nn.Parameter(
                torch.empty_like(layer.w2_weight_scale_second.data),
                requires_grad=False)
        return layer

    @patch('torch_npu.npu_format_cast')
    @patch('torch_npu.npu_quantize')
    @patch('torch.Tensor.npu')
    def test_process_weights_after_loading(self, mock_npu, mock_npu_quantize,
                                           mock_npu_format_cast):
        mock_npu.return_value = torch.Tensor()
        mock_npu_quantize.return_value = torch.Tensor()

        def func_by_args(weight, num_format):
            return weight

        mock_npu_format_cast.side_effect = func_by_args
        # old quant version weight
        layer = self.build_layer(is_new_quant_version=False)
        self.quant_method.process_weights_after_loading(layer)
        self.assertTrue(hasattr(layer, "w13_scale_bias"))
        self.assertEqual(layer.w13_scale_bias.data.shape,
                         (self.experts, 2 * self.input_size))
        self.assertEqual(layer.w13_scale_bias.data.dtype, torch.float32)
        self.assertTrue(hasattr(layer, "w2_scale_bias"))
        self.assertEqual(layer.w2_scale_bias.data.shape,
                         (self.experts, self.output_size))
        self.assertEqual(layer.w2_scale_bias.data.dtype, torch.float32)
        # new quant version weight
        self.quant_method.new_quant_version = True
        new_layer = self.build_layer(is_new_quant_version=True)
        self.quant_method.process_weights_after_loading(new_layer)
        self.assertEqual(new_layer.w13_scale_bias.data.shape,
                         (self.experts, 2 * self.input_size))
        self.assertEqual(new_layer.w2_scale_bias.data.shape,
                         (self.experts, self.output_size))
        self.assertFalse(hasattr(new_layer, "w13_weight_scale_second"))
        # per-channel weight
        self.quant_method.is_per_channel_weight = True
        per_channel_layer = self.build_layer(is_new_quant_version=True,
                                             is_per_channel_weight=True)
        self.quant_method.process_weights_after_loading(per_channel_layer)
        self.assertEqual(new_layer.w13_scale_bias.data.shape,
                         (self.experts, 2 * self.input_size))
